import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;

public class UDTF {


    public static void main(String[] args) throws Exception
    {


        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);


    DataStream<Row> ds2 = env.addSource(new RichSourceFunction<Row>()
    {


        @Override
        public void run(SourceContext<Row> ctx) throws Exception {
            Row row = new Row(2);
            row.setField(0, 22);
            row.setField(1, "aa,b,cdd,dfsfdg,exxxxx");
            ctx.collect(row);
        }

        @Override
        public void cancel() {

        }
    }).returns(Types.ROW(Types.INT, Types.STRING));



tEnv.createTemporaryView("t2",ds2,"age,name_list");
tEnv.registerFunction("test2",new TestTableFunction(","));

    //        Table table2 = tEnv.sqlQuery("select a.age,b.name,b.name_length from t2 a, LATERAL TABLE(test2(a.name_list)) as b(name, name_length)");

    Table table2 = tEnv.sqlQuery("select a.age,b.name,b.name_length from t2 a LEFT JOIN LATERAL TABLE(test2(a.name_list)) as b(name, name_length) ON TRUE");

    DataStream<Row> res2 = tEnv.toAppendStream(table2, Row.class);
    res2.print();

    //        res2.print().name("Table Functions Print").setParallelism(1);

        env.execute();

}
}
